package com.paic.common.middleware.util

import java.text.SimpleDateFormat
import java.util.Date

import org.apache.commons.lang.StringUtils
import org.apache.hadoop.conf.Configuration

/**
  * Created by vito on 2017/8/2.
  */
object HdfsRecoverLease {
  /*按照scala风格创建setter和getter*/
  private[this] var _activeNameNodeHost: String = "10.29.180.177"
  def activeNameNodeHost: String = _activeNameNodeHost
  def activeNameNodeHost_=(value: String): Unit = {
    _activeNameNodeHost = value
  }

  /*按照scala风格创建setter和getter*/
  private[this] var _activeNameNodePort: Int = 9000
  def activeNameNodePort: Int = _activeNameNodePort
  def activeNameNodePort_=(value: Int): Unit = {
    _activeNameNodePort = value
  }

  /*按照scala风格创建setter和getter*/
  private[this] var _hdfsPrefix: String = ""
  def hdfsPrefix: String = _hdfsPrefix
  def hdfsPrefix_=(value: String): Unit = {
    _hdfsPrefix = value
  }

  /*需要被恢复的HDFS路径*/
  var recoverDirPath = "/"

  /*指定的日期， 在恢复租约时将忽略带有此日期的路径*/
  var currentDate = ""

  /*HDFS API接口所需的对象*/
  val conf = new Configuration()

  /*恢复租约失败文件列表*/
  var failRecoverFilesSet: Set[String] = Set()

  /*需要恢复租约的文件总数*/
  var totalRecoverFilesCount = 0

  /*成功恢复租约的文件总数*/
  var successRecoverFilesCount = 0

  /*失败恢复租约的文件总数*/
  var failRecoverFilesCount = 0

  /*删除失败文件列表*/
  var failDeleteFilesSet: Set[String] = Set()

  /*删除成功的文件总数*/
  var successDeleteFilesCount = 0

  /*删除失败的文件总数*/
  var failDeleteFilesCount = 0

  private def initialize(args: Array[String]): Unit = {
    /*获取当前活动的namenode地址和端口*/
    if (args.length > 0 && StringUtils.isNotEmpty(args.apply(0))) {
      activeNameNodeHost = args.apply(0)
    }
    if (args.length > 1 && StringUtils.isNotEmpty(args.apply(1))) {
      activeNameNodePort = args.apply(1).toInt
    }

    hdfsPrefix = "hdfs://" + activeNameNodeHost + ":" + activeNameNodePort

    /*获取需要恢复租约的目录，如果有指定则使用指定目录，否则默认使用根目录*/
    recoverDirPath = if (args.length > 2 && StringUtils.isNotEmpty(args.apply(2))) args.apply(2) else "/"

    /*获取当前日期，如果有指定则使用指定日期，否则默认使用当前日期*/
    currentDate = if (args.length > 3 && StringUtils.isNotEmpty(args.apply(3))) args.apply(3) else getCurrentDate
  }

  def printResult(): Unit = {
    println("*****************************************")
    println("Hdfs Recover Lease done.")
    println("Total Recover files count : " + totalRecoverFilesCount)
    println("Success Recover count : " + successRecoverFilesCount)
    println("Fail Recover count : " + failRecoverFilesCount)
    println("Success delete count : " + successDeleteFilesCount)
    println("Fail delete count : " + failDeleteFilesCount)
    println("*****************************************")
  }

  def getCurrentDate: String = {
    val simpleDateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd")
    simpleDateFormat.format(new Date())
  }

  def main(args: Array[String]): Unit = {

    initialize(args)

    /*判断路径在HDFS中是否存在*/
    if (!HDFSUtil.exits(conf, hdfsPrefix + recoverDirPath)) {
      println("Path dose not exist in HDFS : " + hdfsPrefix + recoverDirPath)
      return
    }

    /*获取需要被恢复租约的文件列表，返回类型为java.util.Set*/
    val javaFilesSet = HDFSUtil.getOpenforwriteFileList(hdfsPrefix + recoverDirPath)
    if (null == javaFilesSet || javaFilesSet.isEmpty) {
      println("No files need to recover lease : " + hdfsPrefix + recoverDirPath)
      return
    }

    /*将java.util.Set转换成scala.collection.immutable.Set*/
    import scala.collection.JavaConverters._
    val filesSet = javaFilesSet.asScala.toSet

    for (filePath <- filesSet) {
      /*排除掉包含ignoreDate的文件路径*/
      if (!filePath.contains(currentDate)) {
        /*由于返回的文件路径不带namenode地址和端口号，需要加上文件的HDFS全路径*/
        totalRecoverFilesCount += 1
        try {
          println("Recovering " + hdfsPrefix + filePath)
          HDFSUtil.recoverLease(hdfsPrefix + filePath)
          successRecoverFilesCount += 1
        } catch {
          case ex: Exception =>
            println(ex.getMessage)
            failRecoverFilesSet += filePath
            failRecoverFilesCount += 1
        }
      }
    }

    /*对无法进行租约恢复的文件进行删除操作*/
    for (filePath <- failRecoverFilesSet) {
      /*非递归的删除无法恢复租约的文件（避免造成删除目录的误操作）*/
      try {
        if (!HDFSUtil.deleteFile(conf, hdfsPrefix + filePath, false)) {
          println("Delete file error : " + hdfsPrefix + filePath)
          failDeleteFilesSet += filePath
          failDeleteFilesCount += 1
        } else {
          successDeleteFilesCount += 1
        }
      } catch {
        case ex: Exception =>
          println(ex.getMessage)
          failDeleteFilesSet += filePath
          failDeleteFilesCount += 1
      }
    }

    /*结果汇总并打印*/
    printResult()
  }

}
